/*
 * Unidata Platform
 * Copyright (c) 2013-2020, UNIDATA LLC, All rights reserved.
 *
 * Commercial License
 * This version of Unidata Platform is licensed commercially and is the appropriate option for the vast majority of use cases.
 *
 * Please see the Unidata Licensing page at: https://unidata-platform.com/license/
 * For clarification or additional options, please contact: info@unidata-platform.com
 * -------
 * Disclaimer:
 * -------
 * THIS SOFTWARE IS DISTRIBUTED "AS-IS" WITHOUT ANY WARRANTIES, CONDITIONS AND
 * REPRESENTATIONS WHETHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION THE
 * IMPLIED WARRANTIES AND CONDITIONS OF MERCHANTABILITY, MERCHANTABLE QUALITY,
 * FITNESS FOR A PARTICULAR PURPOSE, DURABILITY, NON-INFRINGEMENT, PERFORMANCE AND
 * THOSE ARISING BY STATUTE OR FROM CUSTOM OR USAGE OF TRADE OR COURSE OF DEALING.
 */
package org.unidata.mdm.dq.core.service.impl.function.system;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import javax.annotation.Nonnull;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.unidata.mdm.core.type.data.ArrayAttribute;
import org.unidata.mdm.core.type.data.ArrayAttribute.ArrayDataType;
import org.unidata.mdm.core.type.data.ArrayValue;
import org.unidata.mdm.core.type.data.Attribute;
import org.unidata.mdm.core.type.data.Attribute.AttributeType;
import org.unidata.mdm.core.type.data.CodeAttribute;
import org.unidata.mdm.core.type.data.CodeAttribute.CodeDataType;
import org.unidata.mdm.core.type.data.SimpleAttribute;
import org.unidata.mdm.core.type.data.SimpleAttribute.SimpleDataType;
import org.unidata.mdm.core.type.data.SingleValueAttribute;
import org.unidata.mdm.dq.core.context.CleanseFunctionContext;
import org.unidata.mdm.dq.core.dto.CleanseFunctionResult;
import org.unidata.mdm.dq.core.type.cleanse.CleanseFunction;
import org.unidata.mdm.dq.core.type.cleanse.CleanseFunctionInputParam;
import org.unidata.mdm.dq.core.type.io.DataQualityError;
import org.unidata.mdm.dq.core.type.io.DataQualitySpot;
import org.unidata.mdm.dq.core.type.rule.SeverityIndicator;
import org.unidata.mdm.dq.core.util.DQUtils;
import org.unidata.mdm.system.util.TextUtils;

/**
 * Basic SYSTEM cleanse function class.
 *
 * @author ilya.bykov
 */
public abstract class AbstractSystemCleanseFunction implements CleanseFunction {
    /**
     * Name of the function.
     */
    private final String name;
    /**
     * Display name.
     */
    private final Supplier<String> displayName;
    /**
     * Description.
     */
    private final Supplier<String> description;

    /**
     * Instantiates a new cleanse function abstract.
     */
    protected AbstractSystemCleanseFunction(String name, Supplier<String> displayName, Supplier<String> description) {
        super();
        this.name = name;
        this.displayName = displayName;
        this.description = description;
    }
    /**
     * @return the name
     */
    public String getName() {
        return name;
    }
    /**
     * @return the displayName
     */
    public String getDisplayName() {
        return displayName.get();
    }
    /**
     * @return the description
     */
    public String getDescription() {
        return description.get();
    }
    /**
     * Checks, that all supplied parameters are singletons, adding an error to the result, if this is not the case.
     * @param ctx the context to add errors to
     * @param result the CF result
     * @param params the parameters to check
     * @return true, if all singletons, false otherwise
     */
    protected boolean ensureAllSingletons(CleanseFunctionContext ctx, CleanseFunctionResult result, CleanseFunctionInputParam... params) {

        List<DataQualitySpot> paths = null;
        for (int i = 0; params != null && i < params.length;  i++) {

            CleanseFunctionInputParam param = params[i];
            if (Objects.isNull(param)) {
                continue;
            }

            if (!param.isEmpty() && !param.isSingleton()) {

                if (Objects.isNull(paths)) {
                    paths = new ArrayList<>(8);
                }

                paths.addAll(param.getAttributes().stream()
                        .map(attr -> new DataQualitySpot(attr.toLocalPath(), attr))
                        .collect(Collectors.toList()));
            }
        }

        if (CollectionUtils.isNotEmpty(paths)) {

            result.addSpots(paths);
            result.addError(DataQualityError.builder()
                    .ruleName(ctx.getRuleName())
                    .functionName(getName())
                    .category(DQUtils.CATEGORY_SYSTEM)
                    .message(TextUtils.getText("app.dq.multiple.attributes.filtered", ctx.getRuleName(), getName()))
                    .severity(SeverityIndicator.RED)
                    .build());

            return false;
        }

        return true;
    }
    /**
     * Extracts and maps values to holder attributes.
     * @param input the input param
     * @return map
     */
    @Nonnull
    protected Map<Object, List<Attribute>> extractAndMapAttributes(CleanseFunctionInputParam input) {

        if (Objects.isNull(input) || input.isEmpty()) {
            return Collections.emptyMap();
        }

        Map<Object, List<Attribute>> filtered = new HashMap<>(input.getAttributes().size());
        for (Attribute attribute : input.getAttributes()) {

            switch (attribute.getAttributeType()) {
            case ARRAY:
                ArrayAttribute<?> array = attribute.narrow();
                array.forEach(av -> {
                    if (shouldCollect(av, array.getDataType())) {
                        filtered.computeIfAbsent(av.getValue(), key -> new ArrayList<>()).add(attribute);
                    }
                });
                break;
            case CODE:
            case SIMPLE:

                SingleValueAttribute<?> single = attribute.narrow();
                if (shouldCollect(single)) {
                    filtered.computeIfAbsent(single.getValue(), key -> new ArrayList<>()).add(attribute);
                }

                break;
            default:
                break;
            }
        }

        return filtered;
    }

    private boolean shouldCollect(ArrayValue<?> av, ArrayDataType type) {
        return !(Objects.isNull(av.getValue()) || (type == ArrayDataType.STRING && StringUtils.isBlank(av.castValue())));
    }

    private boolean shouldCollect(SingleValueAttribute<?> single) {

        if (Objects.isNull(single) || single.isEmpty()) {
            return false;
        }

        boolean isString =
                (single.getAttributeType() == AttributeType.CODE && ((CodeAttribute<?>) single).getDataType() == CodeDataType.STRING)
             || (single.getAttributeType() == AttributeType.SIMPLE && ((SimpleAttribute<?>) single).getDataType() == SimpleDataType.STRING);

        return !isString || StringUtils.isNotBlank(single.castValue());
    }
}
